import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = "retina"
np.random.seed(42) # entferne diese Zeile, und die Daten sind immer random
def generate_dataset(mu, sigma, amount_of_datapoints, label_name):
x1 = np.random.normal(mu[0], sigma[0], amount_of_datapoints)
x2 = np.random.normal(mu[1], sigma[1], amount_of_datapoints)
X = []
label = []
for i in range(amount_of_datapoints):
X.append([x1[i], x2[i]])
label.append(label_name)
return np.array(X), np.array(label)
datapoints = 50
mu1 = [-4.0 , 1.0]
sigma1 = [ 1.2 , 0.8]
x1, label1 = generate_dataset(mu1, sigma1, datapoints, 0)
mu2 = [2.0 , 3.0]
sigma2 = [0.7 , 1.0]
x2, label2 = generate_dataset(mu2, sigma2, datapoints, 1)
X = np.concatenate((x1, x2), axis=0)
labels = np.concatenate((label1, label2), axis=0)
X.shape, labels.shape
d = {"x1": X[:,0], "x2": X[:,1], "label": labels}
df = pd.DataFrame(data=d)
df
plt.scatter(df.x1[df.label == 0], df.x2[df.label == 0], color = "red", label="Class: 0", s=5)
plt.scatter(df.x1[df.label == 1], df.x2[df.label == 1], color = "green", label="Class: 1", s=5)
plt.legend()
plt.show()
The gradient descent has hit the points perfectly. A 100% accuracy was achieved, after 10 iterations and a learning rate of 1 and without regularisation. The cost minimization also has a nice curved shape and tends to a good minimum.
df_task2 = df.sample(frac=1).reset_index(drop=True)
df_task2.head()
X = df_task2.drop("label", axis=1)
X = X.to_numpy()
print(X.shape)
X[:10]
label = df_task2["label"].values
label = label.reshape(-1,1)
print(label.shape)
label[:10]
$ g(z) = \frac{1}{(1+e^{-z})}$
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def cost_function(theta, X, y):
m = len(y)
h = sigmoid(X @ theta)
cost = -(1/m) * sum( y * np.log(h) + ((1-y) * np.log(1-h)) )
grad = (1/m) * np.transpose(X) @ (h-y)
return cost[0], grad
def cost_function_regularized (theta, X, y, lambd):
m = len(y)
h = sigmoid(X @ theta)
h[h == 1] = 0.999 # log(1) = 0 causes error in division
cost = -(1/m) * sum( y * np.log(h) + ((1-y) * np.log(1-h)) ) # cost function
temp_theta = theta.copy() # not copy object reference
temp_theta[0] = 0
regCost = cost + (lambd / (2*m)) * (np.transpose(temp_theta) @ temp_theta) # regularization
gradReg = (1/m)*( np.transpose(X) @ (h-y) + lambd * temp_theta)
grad = (1/m) * np.transpose(X) @ (h-y)
gradReg[0] = grad[0]
return [regCost[0], gradReg]
def plot_decision_boundary(df, theta, size=0.5):
x_values = [ np.min(df.x1_norm) , np.max(df.x1_norm) ]
y_values = -(theta[0] + theta[1]* x_values) / theta[2]
#print("x_values: ", x_values)
#print("y_values: ", y_values)
#print("theta: ", theta)
plt.figure()
plt.scatter(df.x1_norm[df.label == 0], df.x2_norm[df.label == 0], color = "red", label="Class: 0", s=size)
plt.scatter(df.x1_norm[df.label == 1], df.x2_norm[df.label == 1], color = "green", label="Class: 1", s=size)
plt.plot(x_values,y_values, "b", label = "Decision Boundary")
plt.legend()
plt.xlim([-3, 3])
plt.ylim([-4, 4])
plt.show()
def plot_cost_history(cost_history):
plt.figure()
plt.plot(cost_history)
plt.xlabel("Iteration")
plt.ylabel("J(0)")
plt.title("Cost-Function with Gradient Descent")
plt.show()
def prediction(theta, X):
pred = X @ theta
pred[pred >= 0] = 1
pred[pred < 0] = 0
pred = pred.astype(int)
return pred
def confusion_matrix_binary(act, pred):
m = pd.crosstab(act, pred, rownames=['Actual'], colnames=['Predicted'], margins=True)
if m.shape[1] == 2:
m = np.append(np.zeros((3,1)), m,axis=1)
print(m)
return {"TP": m[0][0], "FN": m[0][1], "FP": m[1][0], "TN": m[1][1]}
def print_binary_confusion_matrix(matrix):
TP = matrix.get("TP")
FN = matrix.get("FN")
FP = matrix.get("FP")
TN = matrix.get("TN")
print("-- Confusion Matrix ---")
print(f" \__0___1__")
print(f"0 | { TP } \t{ FN }")
print(f"1 | { FP } \t{ TN }")
def calculate_metrics(matrix):
TP = matrix.get("TP")
FN = matrix.get("FN")
FP = matrix.get("FP")
TN = matrix.get("TN")
accuracy = (TP + TN) / (TP + TN + FP + FN)
recall_0 = 0.0
if (TP + FN) > 0:
recall_0 = TP / (TP + FN)
precision_0 = 0.0
if (TP + FP) > 0:
precision_0 = TP / (TP + FP)
f1_0 = 0.0
if (recall_0 + precision_0) > 0:
f1_0 = (2 * recall_0 * precision_0) / (recall_0 + precision_0)
recall_1 = 0.0
if (FP + TN) > 0:
recall_1 = TN / (FP + TN)
precision_1 = 0.0
if (TN + FN) > 0:
precision_1 = TN / (TN + FN)
f1_1 = 0.0
if (recall_1 + precision_1) > 0:
f1_1 = (2 * recall_1 * precision_1) / (recall_1 + precision_1)
return {"accuracy": accuracy,
"recall_0": recall_0, "recall_1": recall_1,
"precision_0": precision_0, "precision_1": precision_1,
"f1_0": f1_0, "f1_1": f1_1 }
def print_metrics(m):
acc = m.get("accuracy")
pre_0 = m.get("precision_0")
pre_1 = m.get("precision_1")
rec_0 = m.get("recall_0")
rec_1 = m.get("recall_1")
f1_0 = m.get("f1_0")
f1_1 = m.get("f1_1")
print("\n\t Precision \t Recall \t F1 ")
print(f"Class 0: {pre_0:.4f} \t {rec_0:.4f} \t {f1_0:.4f}")
print(f"Class 1: {pre_1:.4f} \t {rec_1:.4f} \t {f1_1:.4f}")
print(f"\nAccuracy: {acc:.4f}")
def feature_normalization(X):
X_mean = np.mean(X,axis=0)
X_std = np.std(X,axis=0)
X_normaliz = (X - X_mean) / X_std
return X_normaliz , X_mean , X_std
X, X_mean, X_std = feature_normalization(X)
df_task2["x1_norm"] = X[:,0]
df_task2["x2_norm"] = X[:,1]
df_task2.head()
m,n = X.shape
# Bias hinzufügen
X = np.append(np.ones((m,1)),X,axis=1)
# Inital Theta
init_theta = np.zeros((n+1,1))
# Lambda für die Regularisierung
lambd = 0
cost, grad = cost_function_regularized(init_theta, X, label, lambd)
print("Cost mit dem inital Theta: " ,cost)
print("Gradient und initial Theta: " ,grad)
plot_decision_boundary(df_task2, grad, 3)
predicted = prediction(grad, X)
matrix_task2 = confusion_matrix_binary(label[:,0], predicted[:,0])
def gradient_descent(X, y, theta, alpha, iterations, lambd):
m = len(y)
cost_history = []
for i in range(iterations):
[cost, grad] = cost_function_regularized(theta, X, y, lambd)
#print("cost ", cost)
theta = theta - (alpha * grad)
#print(grad)
cost_history.append(cost)
cost_decreasing = abs(cost_history[i-1] - cost)
#print("decr ", cost_decreasing)
if((i > 5) & (cost_decreasing < 1e-7) ) :
print("Stop after iteration: ", i)
break
return theta, cost_history
theta_task2, J_history = gradient_descent(X, label, init_theta, 1 , 10, 0)
print("Der regularized theta:\n",theta_task2)
plot_decision_boundary(df_task2, theta_task2, 3)
predicted = prediction(theta_task2, X)
matrix_task2 = confusion_matrix_binary(label[:,0], predicted[:,0])
plot_cost_history(J_history)
The gradient descent has hit the points perfectly. A 100% accuracy was achieved, after 10 iterations and a learning rate of 1 and without regularisation. The cost minimization also has a nice curved shape and tends to a good minimum.
df_task3 = [[]]*10 # 10 different datasets
datapoints_class_0 = 9000
datapoints_class_1 = 10000
std_decreasing_class_1 = np.linspace(4.5, 0.0, num=10) # [4.5 , 0.0 ] -> [ 0.0 , 0.0] in 10 steps
print(std_decreasing_class_1)
for i in range(len(df_task3)):
mean_class_0 = [ 0.0 , 0.0]
std_class_0 = [ 1.0 , 1.0]
x_class_0, label_class_0 = generate_dataset(mean_class_0, std_class_0, datapoints_class_0, 0)
mean_class_1 = [ std_decreasing_class_1[i] , 0.0]
std_class_1 = [ 1.0 , 1.0]
x_class_1, label_class_1 = generate_dataset(mean_class_1, std_class_1, datapoints_class_1, 1)
X = np.concatenate((x_class_0, x_class_1), axis=0)
labels = np.concatenate((label_class_0, label_class_1), axis=0)
X_norm, X_mean, X_std = feature_normalization(X)
df_task3[i] = pd.DataFrame(data= { "x1": X[:,0], "x2": X[:,1], "x1_norm": X_norm[:,0], "x2_norm": X_norm[:,1] , "label": labels} )
def plot_df(df):
plt.figure()
plt.scatter(df.x1_norm[df.label == 0], df.x2_norm[df.label == 0], color = "red", label="0", s=0.5)
plt.scatter(df.x1_norm[df.label == 1], df.x2_norm[df.label == 1], color = "green", label="1", s=0.5)
plt.legend(["y = 0","y = 1"])
plt.show()
for i in range(len(df_task3)):
print("Dataset Nr. ", i+1)
plot_df(df_task3[i])
def get_X_label(df):
X = np.column_stack([df.x1_norm, df.x2_norm])
label = np.array(df.label)
label = label.reshape(-1,1)
return X, label
gradient_descent_iterations = 100
for lambd in [0,10]:
for i in range(len(df_task3)):
print(f"-------------------------------------\nDataset Nr. {i+1} with Lambda of {lambd} \n")
X , label = get_X_label(df_task3[i])
m, n = X.shape
X = np.append(np.ones((m,1)),X,axis=1)
init_theta = np.zeros((n+1,1))
theta, J_history = gradient_descent(X, label, init_theta, 1 , gradient_descent_iterations, lambd)
plot_decision_boundary(df_task3[i], theta)
plot_cost_history(J_history)
predicted = prediction(theta, X)
matrix = confusion_matrix_binary(label[:,0], predicted[:,0])
#print_binary_confusion_matrix(matrix)
print(" ")
metrics = calculate_metrics(matrix)
print_metrics(metrics)
print(" ")
As expected the Decision Boundary always tries to take its place as good as possible between the two point-clouds. Interestingly, when it is at the last one (No. 10, lambda = 10 and 0), when the two datasets are exactly on top of each other, the decision boundary doesn't know where to go and flies completely out of the picture.
df_task4_train = pd.read_csv("./house_train_data.csv")
df_task4_test = pd.read_csv("./house_test_data.csv")
feature_set_1 = ["price", "view", "zipcode"]
selected_feature_set = feature_set_1
df_task4_train = pd.read_csv("./house_train_data.csv")
x_train = df_task4_train[selected_feature_set]
y_train = df_task4_train["waterfront"]
print(f"Amound of train data: {len(y_train)} \n")
print("Waterfront Y_Train\n")
print(y_train.value_counts())
print("\nWaterfront X_Train")
x_train.head()
x_train = np.column_stack([x_train.iloc[:, i] for i in range(0, x_train.shape[1])])
y_train = np.array(y_train)
y_train = y_train.reshape(-1,1)
x_train, X_mean, X_std = feature_normalization(x_train)
m, n = x_train.shape
x_train = np.append(np.ones((m,1)), x_train ,axis=1) # x = np.insert(X, 0, 1, axis=1)
init_theta = np.zeros((n+1,1))
# ----- Test Variable
df_task4_test = pd.read_csv("./house_test_data.csv")
x_test = df_task4_test[selected_feature_set]
y_test = df_task4_test["waterfront"]
print(f"Amound of test data: {len(y_test)} \n")
print("Waterfront Y_Test\n")
print(y_test.value_counts())
print("\nWaterfront X_Test")
x_test.head()
x_test = np.column_stack([x_test.iloc[:, i] for i in range(0, x_test.shape[1])])
y_test = np.array(y_test)
y_test = y_test.reshape(-1,1)
print(x_test.shape, y_test.shape)
x_test, X_mean, X_std = feature_normalization(x_test)
theta, J_history = gradient_descent(x_train, y_train, init_theta, 0.01 , 4000, 0.5 )
plot_cost_history(J_history)
predicted = prediction(theta, x_train)
matrix = confusion_matrix_binary(y_train[:,0], predicted[:,0])
print(" ")
metrics = calculate_metrics(matrix)
print_metrics(metrics)
feature_set_2 = ["view", "zipcode"]
selected_feature_set = feature_set_2
df_task4_train = pd.read_csv("./house_train_data.csv")
x_train = df_task4_train[selected_feature_set]
y_train = df_task4_train["waterfront"]
print(f"Amound of train data: {len(y_train)} \n")
print("Waterfront Y_Train\n")
print(y_train.value_counts())
print("\nWaterfront X_Train")
x_train.head()
x_train = np.column_stack([x_train.iloc[:, i] for i in range(0, x_train.shape[1])])
y_train = np.array(y_train)
y_train = y_train.reshape(-1,1)
x_train, X_mean, X_std = feature_normalization(x_train)
m, n = x_train.shape
x_train = np.append(np.ones((m,1)), x_train ,axis=1) # x = np.insert(X, 0, 1, axis=1)
init_theta = np.zeros((n+1,1))
# ----- Test Variable
df_task4_test = pd.read_csv("./house_test_data.csv")
x_test = df_task4_test[selected_feature_set]
y_test = df_task4_test["waterfront"]
print(f"Amound of test data: {len(y_test)} \n")
print("Waterfront Y_Test\n")
print(y_test.value_counts())
print("\nWaterfront X_Test")
x_test.head()
x_test = np.column_stack([x_test.iloc[:, i] for i in range(0, x_test.shape[1])])
y_test = np.array(y_test)
y_test = y_test.reshape(-1,1)
print(x_test.shape, y_test.shape)
x_test, X_mean, X_std = feature_normalization(x_test)
theta, J_history = gradient_descent(x_train, y_train, init_theta, 6, 1000, 0.3 )
plot_cost_history(J_history)
predicted = prediction(theta, x_train)
matrix = confusion_matrix_binary(y_train[:,0], predicted[:,0])
metrics = calculate_metrics(matrix)
print_metrics(metrics)
feature_set_3 = ["price", "view", "zipcode", "grade", "bedrooms", "bathrooms", "sqft_living","sqft_lot", "sqft_above", "sqft_basement"]
selected_feature_set = feature_set_3
df_task4_train = pd.read_csv("./house_train_data.csv")
x_train = df_task4_train[selected_feature_set]
y_train = df_task4_train["waterfront"]
print(f"Amound of train data: {len(y_train)} \n")
print("Waterfront Y_Train\n")
print(y_train.value_counts())
print("\nWaterfront X_Train")
x_train.head()
x_train = np.column_stack([x_train.iloc[:, i] for i in range(0, x_train.shape[1])])
y_train = np.array(y_train)
y_train = y_train.reshape(-1,1)
x_train, X_mean, X_std = feature_normalization(x_train)
m, n = x_train.shape
x_train = np.append(np.ones((m,1)), x_train ,axis=1) # x = np.insert(X, 0, 1, axis=1)
init_theta = np.zeros((n+1,1))
# ----- Test Variable
df_task4_test = pd.read_csv("./house_test_data.csv")
x_test = df_task4_test[selected_feature_set]
y_test = df_task4_test["waterfront"]
print(f"Amound of test data: {len(y_test)} \n")
print("Waterfront Y_Test\n")
print(y_test.value_counts())
print("\nWaterfront X_Test")
x_test.head()
x_test = np.column_stack([x_test.iloc[:, i] for i in range(0, x_test.shape[1])])
y_test = np.array(y_test)
y_test = y_test.reshape(-1,1)
print(x_test.shape, y_test.shape)
x_test, X_mean, X_std = feature_normalization(x_test)
theta, J_history = gradient_descent(x_train, y_train, init_theta, 0.5 , 1000, 0.1 )
plot_cost_history(J_history)
predicted = prediction(theta, x_train)
matrix = confusion_matrix_binary(y_train[:,0], predicted[:,0])
print(" ")
metrics = calculate_metrics(matrix)
print_metrics(metrics)
datapoints = 300
mu1 = [-4.0 , 1.0]
sigma1 = [ 1.2 , 0.8]
x1, label1 = generate_dataset(mu1, sigma1, datapoints, 0)
mu2 = [2.0 , 3.0]
sigma2 = [0.7 , 1.0]
x2, label2 = generate_dataset(mu2, sigma2, datapoints, 1)
mu3 = [-2.0 , 6.0]
sigma3 = [0.7 , 1.0]
x3, label3 = generate_dataset(mu3, sigma3, datapoints, 2)
mu4 = [0.2 , -2.3]
sigma4 = [0.7 , 0.5]
x4, label4 = generate_dataset(mu4, sigma4, datapoints, 3)
X = np.concatenate((x1, x2, x3, x4), axis=0)
labels = np.concatenate((label1, label2, label3, label4), axis=0)
print(X.shape, labels.shape)
d = {"x1": X[:,0], "x2": X[:,1], "label": labels}
df = pd.DataFrame(data=d)
df
plt.scatter(df.x1[df.label == 0], df.x2[df.label == 0], color = "red", label="Class: 0", s=5)
plt.scatter(df.x1[df.label == 1], df.x2[df.label == 1], color = "green", label="Class: 1", s=5)
plt.scatter(df.x1[df.label == 2], df.x2[df.label == 2], color = "blue", label="Class: 2", s=5)
plt.scatter(df.x1[df.label == 3], df.x2[df.label == 3], color = "purple", label="Class: 2", s=5)
plt.legend()
plt.show()
X = df.drop("label", axis=1)
X = X.to_numpy()
print(X.shape)
X, X_mean, X_std = feature_normalization(X)
X
def oneVsAll(x, y, lr=0.1 , max_iter=400, lambd = 1):
x = np.insert(x, 0, 1, axis=1)
thetas = []
classes = np.unique(y)
for c in classes:
# one vs. rest binary classification
binary_y = np.where(y == c, 1, 0)
binary_y = binary_y.reshape(-1,1)
init_theta = np.zeros((x.shape[1],1))
theta, J_history = gradient_descent(x, binary_y, init_theta, lr , max_iter, lambd )
thetas.append(theta)
return thetas, classes, J_history
thetas, classes, cost_history = oneVsAll(X, labels)
print(thetas)
plt.scatter(X[:,0], X[:, 1], c=labels, alpha=0.5)
for theta in [thetas[0],thetas[1], thetas[2], thetas[3]]:
#print(theta)
j = np.array([X[:, 0].min(), X[:, 1].max()])
k = -(j * theta[1] + theta[0]) / theta[2]
plt.xlim([-3, 3])
plt.ylim([-4, 4])
plt.plot(j, k, color='k', linestyle="--")
def predict_classes(classes, thetas, x):
x = np.insert(x, 0, 1, axis=1)
preds = [np.argmax( [sigmoid(xi @ theta) for theta in thetas]) for xi in x]
return [classes[p] for p in preds]
def score(classes, theta, x, y):
return (predict_classes(classes, theta, x) == y).mean()
print(f"Train Accuracy: {score(classes, thetas, X, labels):.3f}")